package cn.ways;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class Kafka2Es {

    private static final Logger log = LoggerFactory.getLogger(Kafka2Es.class);

    // /opt/bitnami/kafka/bin/kafka-console-producer.sh --topic test_kafka --broker-list localhost:9092
    public static void main(String[] args) throws Exception {
        log.debug("start- debug");
        log.info("star- info");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(30000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//        Properties props = new Properties();
//        props.setProperty("bootstrap.servers","kube.ways.cn:5792");
//        props.setProperty( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.setProperty("group.id", "test");
//        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<String>("test_kafka", new SimpleStringSchema(), props));

        KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.<String>builder()
                .setBootstrapServers("kube.ways.cn:5792")
                .setTopics("test_kafka")
                .setGroupId("test")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema());

        DataStreamSource<String> stream = env.fromSource(kafkaSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

//        HttpHost httpHost = new HttpHost("kube.ways.cn", 9200, "http");
        HttpHost httpHost = new HttpHost("10.10.22.206", 30920, "http");
        List<HttpHost> httpHostList = Collections.singletonList(httpHost);

//        new Elasticsearch7DynamicSinkFactory().
        ElasticsearchSink.Builder<String> esBuilder = new ElasticsearchSink.Builder<>(
                httpHostList,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        return Requests.indexRequest()
                                .index("test-kafka")
//                                .type("my-type")
                                .source(json);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                });
        // 批量请求的配置；下面的设置使 sink 在接收每个元素之后立即提交，否则这些元素将被缓存起来
        esBuilder.setBulkFlushMaxActions(1);
        // 为内部创建的 REST 客户端提供一个自定义配置信息的 RestClientFactory
        esBuilder.setRestClientFactory(
                restClientBuilder -> {
//                    restClientBuilder.setDefaultHeaders(...)
//                    restClientBuilder.setMaxRetryTimeoutMillis(...)
//                    restClientBuilder.setPathPrefix(...)
//                    restClientBuilder.setHttpClientConfigCallback(...)

                    restClientBuilder.setHttpClientConfigCallback(builder->{
                        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","abc.123"));
                        return builder.setDefaultCredentialsProvider(credentialsProvider);
                    });

                }
        );
        esBuilder.setFailureHandler(new ActionRequestFailureHandler() {
            @Override
            public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
                log.error("fail");
            }
        });
        stream.addSink(esBuilder.build());

        env.execute("Kafka Flink");

    }


}